Skip to content

refactor(gateway): pure-C++ managers via Transport adapters + discovery split + graph-event refresh#397

Open
bburda wants to merge 31 commits into
mainfrom
feat/medkit-managers-provider-injection
Open

refactor(gateway): pure-C++ managers via Transport adapters + discovery split + graph-event refresh#397
bburda wants to merge 31 commits into
mainfrom
feat/medkit-managers-provider-injection

Conversation

@bburda
Copy link
Copy Markdown
Collaborator

@bburda bburda commented Apr 29, 2026

Pull Request

Summary

This PR is the second pass after #391: each SOVD manager keeps its routing and
business logic, but the ROS 2 binding is extracted behind neutral Transport
adapters. Manager unit tests can now compose with mock transports and link only
against gateway_core, which removes rclcpp from the unit-test link line and
eliminates the executor-lifecycle setup that was the root cause of intermittent
test failures (process spawned but graph not yet seen,
subscription destroyed mid-callback).

The change also folds RuntimeDiscoveryStrategy into the existing
IntrospectionProvider chain (built-in graph queries become just another
provider), relocates TypeIntrospection next to the rest of the
rosidl_typesupport glue in ros2_medkit_serialization, and switches the
runtime-discovery refresh from cyclic polling to rclcpp graph-event driven, with
the timer kept only as a low-frequency safety backstop.

This PR stacks on #392 (feat/medkit-core-scaffold). When #392 merges, GitHub
auto-rebases this branch onto main.


Issue

Link the related issue (required):


Type

  • New feature or tests
  • Bug fix
  • Breaking change
  • Documentation only

Testing

How was this tested / how should reviewers verify it?

The changes were exercised through the full local test suite. Reviewers can
reproduce by running, from the gateway worktree:

  • colcon build --symlink-install - whole workspace compiles cleanly
    (15 packages, no errors)
  • ./scripts/test.sh unit - 2727 unit tests, 0 failures, 0 errors. The
    new mock-transport tests link only against gateway_core + GTest with
    no ament_target_dependencies.
  • ./scripts/test.sh lint --packages-select ros2_medkit_gateway - linters
    pass (clang-format, copyright, cmake-lint, flake8, pep257, xmllint)
  • ./scripts/test.sh integ --packages-select ros2_medkit_integration_tests
    - 4501 tests, 0 failures, including the test_graph_event_discovery
    and test_triggers_persistent features
  • colcon build --packages-select ros2_medkit_gateway --cmake-args -DENABLE_CLANG_TIDY=ON
    then ./scripts/test.sh tidy --packages-select ros2_medkit_gateway -
    clang-tidy clean on every changed file
  • colcon test --packages-select ros2_medkit_gateway --ctest-args -R "gateway_core_smoke"
    - the smoke test links only gateway_core + GTest and proves all eight
    provider interfaces, seven Transport ports, and six manager class names
    are reachable without rclcpp

To verify the discovery latency improvement, launch the gateway, then
ros2 run a demo node and observe /apps listing the new entity below 500 ms.


Checklist

  • Breaking changes are clearly described (and announced in docs / changelog if needed)
  • Tests were added or updated if needed
  • Docs were updated if behavior or public API changed

Commits

13 commits build up the main change in self-contained, reviewable steps. Each
step keeps the suite green and ends with the project compiling cleanly.

  1. refactor(gateway): relocate operation result types into core/operations
  2. refactor(gateway): relocate parameter result types into core/configuration
  3. refactor(gateway): relocate FaultResult into core/faults
  4. feat(gateway): introduce neutral Transport interfaces under core/transports
  5. refactor(gateway): route DataAccessManager through TopicTransport adapter
  6. refactor(gateway): route OperationManager through Service/Action transports
  7. refactor(gateway): route ConfigurationManager through ParameterTransport
  8. refactor(gateway): route FaultManager through FaultServiceTransport
  9. refactor(gateway): route LogManager through LogSource adapter
  10. refactor(gateway): route TriggerManager through TopicSubscriptionTransport
  11. refactor(gateway): convert runtime discovery to IntrospectionProvider
  12. refactor: relocate TypeIntrospection to ros2_medkit_serialization
  13. refactor(gateway): drive discovery refresh from rclcpp graph events

Two functional fixes, one doc sweep, and a small clang-tidy / formatting
follow-up were folded into the branch during verification:

  • fix(gateway): only sweep orphan triggers on the backstop refresh - the
    graph-event refresh path was running the orphan-trigger sweep during the
    cold-start window, before DDS discovery had populated the entity cache.
    Restored persistent triggers were then treated as orphans and removed from
    the SQLite store. Sweep now runs only from the backstop tick, where the
    cache is guaranteed to reflect a settled DDS view.
  • fix(opcua): bump test_opcua_plugin TIMEOUT to 240s - the suite legitimately
    needs ~90 s wallclock for its 13 OPC UA connection-failure tests; the
    default 60 s timeout truncated the run mid-suite under heavy parallel
    colcon test load.
  • docs(gateway): refresh design index for managers-in-core layout updates
    the layered-library note, the TriggerTopicSubscriber class summary, and
    the TriggerManager arrow on the class diagram so the design document
    matches the new manager / transport split.
  • fix(gateway): resolve clang-tidy findings on touched files and a small
    follow-up commit address every clang-tidy warning the incremental PR
    job emits on the cpp files this branch modifies (named-parameter,
    std::bind -> lambda, single-character find(), copied
    shared_ptr parameters, vector::reserve before emplace_back,
    and one operator+= rewrite in fault_handlers).

@bburda bburda self-assigned this Apr 29, 2026
@bburda bburda force-pushed the feat/medkit-core-scaffold branch from 7b0a134 to 8397e1c Compare May 9, 2026 18:59
bburda added 4 commits May 10, 2026 15:54
Extracts ServiceCallResult, the ActionGoalStatus enum + free
action_status_to_string helper, the four Action*Result structs, and
ActionGoalInfo into a dedicated header under core/operations/. The types
were already neutral C++ but lived behind the rclcpp-pulling
operation_manager.hpp include path, which prevented the neutral build
layer from consuming them. The OperationManager header forwards to the
new location; the helper definition moves alongside into a small core
translation unit so the symbol is provided by gateway_core.
…ation

Extracts ParameterErrorCode and ParameterResult into a dedicated header
under core/configuration/. The structured-error contract becomes
available to the neutral build layer; the ConfigurationManager header
forwards to the new location.
Extracts FaultResult, the JSON-bag outcome shared by seven of the eight
fault-manager methods, into a dedicated header under core/faults/. The
existing FaultWithEnvResult still exposes raw message types and stays
inside fault_manager.hpp until the transport extraction moves the
message-to-JSON conversion to the adapter layer.
…sports

Defines seven abstract ports - TopicTransport, ServiceTransport,
ActionTransport, ParameterTransport, FaultServiceTransport, LogSource,
TopicSubscriptionTransport - that the manager refactors will route through.
Each port carries the same operations the corresponding manager performs
today, expressed in pure C++ types (already-neutral result structs from
core/operations, core/configuration, core/faults). The smoke test extends
to assert each interface is abstract and reachable from the gateway_core
build layer with no ament dependency on the link line. Adapter
implementations land under src/ros2/transports/ in subsequent phases.

Two name choices avoid collisions with structs that currently live in the
ROS-coupled headers:

* TopicTransport::sample returns TopicSample rather than TopicSampleResult,
  because core/data/data_types.hpp already defines a same-named struct
  with a different shape (per-topic batch errors, endpoint QoS) for the
  topic-data-provider plugin surface.
* FaultServiceTransport::get_fault_with_env returns FaultWithEnvJsonResult
  rather than FaultWithEnvResult, because the legacy
  ros2_medkit_msgs-typed FaultWithEnvResult still lives next to the
  FaultManager facade. The two will be reconciled when the FaultManager
  migration lands.

TopicSubscriptionHandle is a polymorphic RAII base (virtual destructor
only) rather than a fully abstract port, so the smoke assertion uses
sizeof > 0 instead of std::is_abstract_v.
Base automatically changed from feat/medkit-core-scaffold to main May 10, 2026 13:55
bburda added 16 commits May 10, 2026 15:55
…pter

The data-access manager body becomes pure C++. All rclcpp use - generic
publisher cache, JSON serializer instance, native sample backend, type
introspection - moves into the new Ros2TopicTransport adapter under
src/ros2/transports/. The manager now takes a shared_ptr<TopicTransport>
plus the sample timeout and routes publish / sample / native-sample
through it. The adapter exposes the type-introspection helper through
the transport interface so handlers retain their existing accessor.

The handler-facing public API (publish_to_topic, get_topic_sample_with_
fallback, get_topic_sample_native, get_type_introspection,
set_topic_data_provider, get_topic_data_provider, get_topic_sample_
timeout) is preserved verbatim. The legacy include path
ros2_medkit_gateway/data_access_manager.hpp is preserved as a forwarding
shim. New mock-transport tests link only against gateway_core + GTest,
proving the manager logic compiles without rclcpp on the link line.
…sports

OperationManager retains all goal-tracking, UUID generation, validation,
and component-namespace resolution logic in pure C++. The rclcpp side -
GenericServiceClient cache, action-internal-service clients, the
GoalStatusArray subscription cache - moves into two new adapters,
Ros2ServiceTransport and Ros2ActionTransport, under src/ros2/transports/.

The manager registers a per-goal status callback at subscribe time so
the action transport pushes goal-status updates back into the manager's
tracking map on its own thread. Goal-status array decoding moves into
the transport; the manager only consumes the typed (path, goal_id,
status) tuples. The handler-facing public API (call_service,
call_component_service, send_action_goal, send_component_action_goal,
cancel_action_goal, get_action_result, list_tracked_goals,
update_goal_status, update_goal_feedback, cleanup_old_goals,
subscribe_to_action_status, unsubscribe_from_action_status, the four
static is_*-type helpers, the UUID helpers) is preserved.

The component-name resolver dependency moves behind a new
ServiceActionResolver port that DiscoveryManager now implements; this
keeps the manager translation unit out of discovery_manager.hpp's
rclcpp transitive include chain. Mock-transport tests link only against
gateway_core + GTest, exercising 13 routing / lifecycle scenarios.
ConfigurationManager loses its rclcpp::SyncParametersClient cache, the
rclcpp::Parameter defaults cache, the negative-cache for unreachable
nodes, and the spin_mutex serialising parameter-client spins. All of
these move into the new Ros2ParameterTransport adapter under
src/ros2/transports/. The adapter also owns the JSON <-> rclcpp::
ParameterValue conversion helpers and the gateway-own-FQN check that
previously lived as private members of the manager.

Manager retains: per-entity reset orchestration (reset_parameter and
reset_all_parameters compose set_parameter with transport-supplied
defaults via the new get_default and list_defaults methods on
ParameterTransport), the self-node short circuit (delegated to the
transport's is_self_node), and the shutdown ordering contract
(idempotent manager.shutdown() fans out to transport.shutdown() before
rclcpp::shutdown()).

The handler-facing public API (list_parameters, get_parameter,
set_parameter, reset_parameter, reset_all_parameters, shutdown) is
preserved verbatim; the legacy include path
ros2_medkit_gateway/configuration_manager.hpp is preserved as a
forwarding shim. Parameter-service tuning parameters
(parameter_service_timeout_sec, parameter_service_negative_cache_sec)
are declared at gateway_node wiring time and passed to the transport
constructor.

ParameterTransport gains two new pure-virtual methods (get_default,
list_defaults) that surface the cached defaults as neutral JSON for
manager-level reset orchestration. The gateway_core link-time smoke
test now also pins ConfigurationManager. Mock-transport tests link
exclusively against gateway_core + GTest, with no rclcpp on the link
line, covering CRUD delegation, self-node short-circuit, reset via
cached defaults, partial-failure aggregation in reset_all_parameters,
and shutdown idempotency.
FaultManager loses its seven rclcpp::Client members and their seven
per-client mutexes; all move into the new Ros2FaultServiceTransport
adapter under src/ros2/transports/. The adapter performs the
ros2_medkit_msgs to JSON conversion internally and returns the neutral
FaultResult / FaultWithEnvJsonResult structures, so the manager body now
lives in the ROS-free build layer (gateway_core).

The handler-facing public API (report_fault, list_faults, get_fault,
get_fault_with_env, clear_fault, get_snapshots, get_rosbag,
list_rosbags, is_available, wait_for_services) is preserved verbatim.
The single behaviour change is that get_fault_with_env now returns
the response body as JSON: data carries { "fault": {...},
"environment_data": {...} } already converted by the transport. The
single handler call site (build_sovd_fault_response) is updated to
consume the JSON, post-processing rosbag snapshots to add the per-
request bulk_data_uri and freeze_frame snapshots to extract the
primary value.

The previously static FaultManager::fault_to_json helper is removed.
Two external call sites (SSEFaultHandler, TriggerFaultSubscriber)
subscribe directly to the FaultEvent topic and convert the message
themselves; they now use the small ros2/conversions/fault_msg_conversions
module that lives at the ROS-coupled boundary alongside the transport.

The legacy include path ros2_medkit_gateway/fault_manager.hpp is
preserved as a forwarding shim. New mock-transport tests link only
against gateway_core + GTest, proving the manager logic compiles
without rclcpp on the link line.
LogManager keeps its ring-buffer storage, per-entity config map, monotonic
id counter, plugin observer pattern, and the manages_ingestion
short-circuit - all pure C++. The /rosout subscription and the
rcl_interfaces::msg::Log to LogEntry conversion move into the new
Ros2LogSource adapter under src/ros2/transports/. The adapter emits
LogEntry through a callback the manager registers at start() time;
when the primary LogProvider declares full ingestion, the manager never
calls start() (matching today's behaviour of not creating the
subscription at all in that mode).

To let the manager body live in gateway_core alongside the other
managers, the PluginManager pointer is replaced with a thin
LogProviderRegistry port (primary_log_provider + log_observers).
PluginManager implements the port inline, so production wiring is
unchanged. The handler-facing public API (get_logs, get_config,
update_config, add_log_entry, set_notifier,
set_node_to_entity_resolver, the static helpers,
inject_entry_for_testing) is preserved verbatim.

Mock-source tests link only against gateway_core + GTest, proving the
manager logic compiles without rclcpp on the link line. The legacy
include path ros2_medkit_gateway/log_manager.hpp is preserved as a
forwarding shim.
…sport

TriggerManager keeps all its condition-evaluation, retry-unresolved,
sweep-orphaned, persistence, dispatch-index, and entity-cache logic in
pure C++. The pointer to TriggerTopicSubscriber is replaced with a
shared TopicSubscriptionTransport interface; the new
Ros2TopicSubscriptionTransport adapter wraps the existing
TriggerTopicSubscriber, preserving its subscription-destructor pattern.
Per-trigger subscription handles live in the manager's tracking map -
destruction unsubscribes - so trigger lifetime fully drives subscription
lifetime.

The trigger manager source moves to src/core/managers/ so it is picked
up by gateway_core's GLOB. TriggerTopicSubscriber moves to
include/ros2_medkit_gateway/ros2/ and src/ros2/, and a forwarding
shim at the old header path keeps existing includes working.

TriggerTopicSubscriber itself becomes a generic per-handle subscription
executor: subscribe(topic, type, handle_key, callback) creates one
GenericSubscription per key, unsubscribe(handle_key) tears it down,
and pending/retry semantics are preserved with the same 60s timeout.

The handler-facing public API (create / get / list / update / remove,
wait_for_event, consume_pending_event, set_on_removed,
set_entity_children_fn, set_entity_exists_fn, set_resolve_topic_fn,
retry_unresolved_triggers, sweep_orphaned_triggers,
load_persistent_triggers) is preserved verbatim.

Mock-transport tests link only against gateway_core + GTest, proving
the manager body compiles without rclcpp on the link line.
RuntimeDiscoveryStrategy is replaced by Ros2RuntimeIntrospection, which
implements the existing IntrospectionProvider interface used by the
merge pipeline for plugin-driven entity discovery. Built-in ROS graph
queries are now treated identically to plugin-provided introspection -
one chain, one merge policy.

HybridDiscoveryStrategy is removed; the equivalent runtime + manifest
+ plugin merge is the default MergePipeline configuration. The
discovery_mode parameter (runtime_only / manifest_only / hybrid) now
controls which layers the merge pipeline activates rather than which
strategy class to instantiate.
The class is the rosidl_typesupport_cpp + rosidl_typesupport_introspection_cpp
bridge built on top of JsonSerializer; it semantically belongs alongside the
rest of the rosidl glue rather than in the gateway. Gateway packages now
depend on the serialization library for type schemas instead of duplicating
the introspection backend.

- Move type_introspection.{hpp,cpp} to ros2_medkit_serialization (preserves
  Apache 2.0 header verbatim).
- Migrate the namespace from ros2_medkit_gateway to ros2_medkit_serialization;
  qualify all consumer call sites (gateway managers, transports, providers,
  handlers, discovery, tests).
- Update forward declarations in topic_transport.hpp,
  data_access_manager.hpp, discovery_manager.hpp.
- Move the TypeIntrospection unit suite from
  src/ros2_medkit_gateway/test/test_data_access_manager.cpp to
  src/ros2_medkit_serialization/test/test_type_introspection.cpp; register
  the new test target.
- Drop src/type_introspection.cpp from the gateway gateway_ros2 source list.

The rosidl_typesupport_cpp / rosidl_typesupport_introspection_cpp deps remain
in the gateway because the GenericClient compat shim still uses them.
Replace the cyclic wall-timer loop that called refresh_cache() every
refresh_interval_ms with an event-driven design:

- A 100 ms wall timer polls rclcpp::Node::get_graph_event()->check_and_clear()
  and runs refresh_cache() only when the ROS 2 graph actually changed (node
  up/down, topic/service/action add or remove). On a stable graph the
  callback is a single atomic check.
- A second wall timer at refresh_interval_ms acts as a safety backstop,
  forcing an unconditional refresh so liveness is preserved if a graph
  event is ever missed.

Semantics of the existing refresh_interval_ms parameter shift from
"primary refresh interval" to "safety-backstop interval", and the default
moves from 10 s (gateway_params.yaml) / 2 s (gateway.launch.py) to 30 s.
The validation range stays 100-60000 ms; existing test overrides at
1000 ms continue to work, just as a tighter backstop.

Test impact:

- New integration test verifies that a node spawned mid-run appears in
  /apps within 5 s while the gateway runs with a 30 s backstop, proving
  the graph-event poll path is what drives the refresh.
- test_operation_handlers seeds the entity cache directly and used to
  rely on refresh_cache() never firing during the 1 s settle period.
  Graph events from the test executor now do trigger refreshes; seed
  after the settle so the test's manually-injected component wins.

Design intent on idle CPU: the previous timer woke and ran the full
refresh_cache() pipeline every refresh_interval_ms regardless of whether
the graph had changed. With this change an idle gateway runs only the
100 ms graph-event check (a single atomic load) plus the 30 s backstop.

Documentation updated: README.md parameter tables, gateway_params.yaml
comments, design/aggregation.rst.
Persistent triggers loaded at startup were being removed before DDS
discovery had populated the entity cache. The graph-event refresh path
fires on every node up/down event, including the cold-start window where
the cache reflects only a partial view of the ROS 2 graph. Running the
orphan sweep on every graph event treated restored triggers as orphans
because their target entities had not yet been seen by this gateway,
removing them from both memory and the persistent SQLite store.

Move the sweep call to the backstop timer only. The backstop runs at the
configured refresh cadence (default 30 s, 1 s in tests), giving DDS time
to converge before any orphan check. The graph-event path keeps doing
the cheap cache refresh, so spawn-detection latency is unchanged.
Each test in the suite connects to a non-existent OPC UA host and waits
about 3.8 s for the DNS / TCP failure path. With 13 tests the wallclock
run requires roughly 90 s. The ament_add_gtest default timeout of 60 s
truncated the run mid-suite, which CTest then surfaced as a
"missing_result" error under heavy parallelism (visible in concurrent
colcon test invocations on multi-core hosts).

Set TIMEOUT 240 to give a comfortable margin without hiding genuine
hangs.
The neutral-managers refactor moved DataAccessManager, OperationManager,
ConfigurationManager, FaultManager, LogManager, and TriggerManager into
``gateway_core`` and gave each one a Transport interface for ROS
interaction. Update the layered-library note to reflect that placement
and the new neutral interfaces.

TriggerTopicSubscriber became a generic per-handle subscription executor
wrapped by Ros2TopicSubscriptionTransport. Update its class summary,
component list, and the TriggerManager arrow on the class diagram so
they describe the new per-trigger handle ownership rather than the old
reference-counted shared-subscription model.
Apply clang-tidy fixes across the files this branch already modifies so
the incremental clang-tidy job (which scans every file changed in a PR)
stays clean.

- ``fault_handlers.cpp``: replace temp-allocating ``operator+`` chain in
  ``bulk_data_uri`` construction with explicit ``operator+=`` /
  ``std::move`` (performance-inefficient-string-concatenation).
- ``test_configuration_manager.cpp``: pre-allocate ``threads.reserve(...)``
  before ``emplace_back`` loops (performance-inefficient-vector-operation).
- ``test_configuration_manager_routing.cpp``,
  ``test_data_access_manager_routing.cpp``,
  ``test_operation_manager_routing.cpp``,
  ``test_operation_handlers.cpp``: name unused parameters in mock and
  override signatures (readability-named-parameter,
  misc-unused-parameters).
- ``test_fault_handlers.cpp``: switch single-character
  ``std::string::find("X")`` calls to ``find('X')``
  (performance-faster-string-find).
- ``test_operation_handlers.cpp``: replace ``std::bind(...)`` with
  forwarding lambdas (modernize-avoid-bind), and pass shared_ptr
  arguments by const-reference instead of by value
  (performance-unnecessary-value-param).
- ``test_trigger_manager_routing.cpp``: use ``empty()`` instead of
  ``size()`` in a boolean context (readability-container-size-empty).

After these changes ``clang-tidy -p build`` is clean on every cpp this
branch touches.
Follow-up to the earlier clang-tidy fixup. After clang-format wrapped
the goal-callback lambda onto its own line, clang-tidy re-evaluated
the parameter list and re-flagged ``goal`` as a copied
``std::shared_ptr`` only used as a const reference. Switch the
parameter to ``const ...&`` to match the rest of the action lambdas
in the file.
…hook

clang-tidy prints a 'X warnings generated. Suppressed Y' summary plus
source snippets on stderr even when every diagnostic is filtered out.
Streaming that for every clean file across a 100-file pre-push run fills
pre-commit's output buffer; the framework then fails with
BlockingIOError on sys.stdout.buffer.write and the push aborts before
git ever opens the connection.

Capture clang-tidy stdout+stderr per file and only emit it on non-zero
exit. Add --quiet to drop the progress lines that clang-tidy keeps even
when there are no diagnostics.

Effect: clean files produce zero output, so the framework never has a
multi-megabyte buffer to flush; only files with real findings produce
output, exactly as before.
…ndant c_str

Two pre-existing clang-tidy findings on the test fixture surface in the
pre-push hook now that the gateway PR brings the affected file into
scope:

- LocalHttpServer owns a std::thread and a raw httplib::Server pointer
  and declares a non-default destructor, so the project's
  cppcoreguidelines-special-member-functions check (with
  AllowSoleDefaultDtor) demands all five special members. Mark copy/move
  as = delete since the wrapped server cannot be safely duplicated.
- The httplib::Server::Get overload accepts a std::string by const
  reference, so the .c_str() conversion is redundant
  (readability-redundant-string-cstr).
@bburda bburda force-pushed the feat/medkit-managers-provider-injection branch from 9bae856 to efd9e3e Compare May 10, 2026 17:17
bburda added 3 commits May 10, 2026 19:18
…ager

Two pre-existing clang-tidy findings on the test fixture surface in the
pre-push hook now that the routing refactor brings the affected file
into scope:

- The local using json = nlohmann::json shadows the namespace-scope
  alias the manager+transport headers already export; rely on the
  inherited one (clang-diagnostic-shadow).
- std::string::find("T") is the const-char-pointer overload; the
  single-character literal calls for find('T')
  (performance-faster-string-find).
…dentation

docutils 0.21 reads the second line of a wrapped sub-bullet as a new
indentation block (build-docs CI fails with -W under sphinx 8). Collapse
the wrap to a single line so the bullet text stays in the parent list.

Affects only line 466 of the design index; no semantic content change.
…stripped libcpp-httplib TSan races

Two CI failures uncovered after the rebase to the graph-event-driven
discovery refresh:

ASan failure on OperationHandlersFixtureTest.ListExecutionsReturnsTrackedActionGoal:
  Sending an action goal subscribes to the action's feedback/result
  topics, which the gateway's graph-event refresh observes and uses to
  rewrite the entity cache from its (empty) own discovery view, wiping
  the manually seeded engine component. Under ASan the refresh has more
  wall-clock to land between create_action_execution() returning and the
  subsequent handle_list_executions() call. Re-seed the cache from the
  helper after handle_create_execution so any subsequent handler call
  still sees the engine entity.

TSan races in libcpp-httplib.so triggered by daisy-chain aggregation:
  The reported races live in cpp-httplib's own listen / accept / response
  paths between two server threads spawned from RESTServer::start.
  An existing race:httplib::* suppression covers symbolised frames; the
  system-packaged .so however is shipped stripped, so the TSan stack ends
  in <null> frames inside libcpp-httplib.so.0.14 and the symbolic
  suppression no longer matches. Add a called_from_lib: rule for both the
  versioned and unversioned soname so the stripped frames are silenced
  the same way upstream-symbolised builds were.
@bburda bburda marked this pull request as ready for review May 11, 2026 10:07
Copilot AI review requested due to automatic review settings May 11, 2026 10:07
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the gateway so SOVD managers and related business logic live in the ROS-free gateway_core layer behind neutral Transport interfaces, with ROS 2 specifics moved into gateway_ros2 adapters. This improves unit-test isolation (no rclcpp link dependency for manager tests), reshapes runtime discovery to flow through the IntrospectionProvider chain, and switches refresh behavior to primarily graph-event-driven with a periodic safety backstop.

Changes:

  • Introduces/expands neutral Transport ports (topic/service/action/parameter/fault/log/trigger-subscription) and routes core managers through them, enabling gateway_core-only unit tests with mock transports.
  • Reworks discovery to use Ros2RuntimeIntrospection (as an IntrospectionProvider) and updates docs/config defaults for graph-event-driven refresh + backstop timer.
  • Relocates TypeIntrospection into ros2_medkit_serialization and updates gateway call sites/tests accordingly.

Reviewed changes

Copilot reviewed 107 out of 107 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tsan_suppressions.txt Expands TSan suppressions to match stripped libcpp-httplib.so frames by library name.
scripts/clang-tidy-diff.sh Suppresses “clean file” clang-tidy output unless failures occur (pre-commit friendliness).
scripts/check_no_naked_subscriptions.sh Updates allowed legacy files list to reflect ROS2 adapters moved under src/ros2/.
src/ros2_medkit_serialization/CMakeLists.txt Adds TypeIntrospection source + new gtest for it.
src/ros2_medkit_serialization/include/ros2_medkit_serialization/type_introspection.hpp Moves TypeIntrospection into ros2_medkit_serialization namespace and updates serializer member type.
src/ros2_medkit_serialization/src/type_introspection.cpp Updates include/namespace and exception types after relocation into serialization package.
src/ros2_medkit_serialization/test/test_type_introspection.cpp Adds unit tests covering schema/template/info retrieval and caching behavior.
src/ros2_medkit_plugins/ros2_medkit_opcua/CMakeLists.txt Increases test_opcua_plugin timeout to 240s to avoid CI truncation.
src/ros2_medkit_plugins/ros2_medkit_graph_provider/test/test_graph_provider_plugin.cpp Fixes httplib route registration call style and makes LocalHttpServer non-copyable/movable for RAII safety.
src/ros2_medkit_gateway/CMakeLists.txt Rewires gateway_ros2 sources (adds ROS2 adapters; removes managers moved to core) and adds gateway_core-only routing tests.
src/ros2_medkit_gateway/config/gateway_params.yaml Updates refresh semantics docs + default refresh_interval_ms for safety-backstop mode.
src/ros2_medkit_gateway/launch/gateway.launch.py Updates launch arg description to clarify safety-backstop refresh interval meaning.
src/ros2_medkit_gateway/launch/gateway_https.launch.py Same as above for HTTPS launch file.
src/ros2_medkit_gateway/README.md Updates documented default and semantics for refresh_interval_ms after graph-event refresh change.
src/ros2_medkit_gateway/design/index.rst Updates design docs for managers-in-core + transport adapters + discovery class diagram changes.
src/ros2_medkit_gateway/design/aggregation.rst Updates aggregation health-check description to match graph-event refresh + backstop timer.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/gateway_node.hpp Adds members for ROS2 transport adapters and graph-event/backstop timers; updates refresh locking comments.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_manager.hpp Refactors discovery manager to own pipeline + cached result, implement ServiceActionResolver, and use Ros2RuntimeIntrospection.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_pipeline.hpp Updates thread-safety warnings to refer to DiscoveryManager’s caching/mutexing.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/discovery/discovery_strategy.hpp Marks DiscoveryStrategy as legacy, with discovery now driven by providers/pipeline.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/discovery/service_action_resolver.hpp Adds a neutral resolver interface to decouple OperationManager from DiscoveryManager headers.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/discovery/layers/runtime_layer.hpp Switches RuntimeLayer dependency to Ros2RuntimeIntrospection.
src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp Implements RuntimeLayer changes using Ros2RuntimeIntrospection.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/providers/ros2_runtime_introspection.hpp Introduces ROS2 graph introspection provider implementing IntrospectionProvider + direct query helpers.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/topic_transport.hpp Adds TopicTransport port + TopicSample neutral result struct.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/service_transport.hpp Adds ServiceTransport port for generic service calls returning JSON.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/action_transport.hpp Adds ActionTransport port for action send/cancel/get_result + status subscription callbacks.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/parameter_transport.hpp Adds ParameterTransport port for parameter CRUD + cached defaults + shutdown/invalidate semantics.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/fault_service_transport.hpp Adds FaultServiceTransport port returning neutral JSON-shaped fault results.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/log_source.hpp Adds LogSource port for /rosout-like log ingestion.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/transports/topic_subscription_transport.hpp Adds TopicSubscriptionTransport port + RAII TopicSubscriptionHandle for trigger data subscriptions.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_service_transport.hpp Declares ROS2 adapter implementing ServiceTransport with generic-client caching.
src/ros2_medkit_gateway/src/ros2/transports/ros2_service_transport.cpp Implements ServiceTransport via generic clients + serialization.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_action_transport.hpp Declares ROS2 adapter implementing ActionTransport (generic internal service clients + status subs).
src/ros2_medkit_gateway/src/ros2/transports/ros2_action_transport.cpp Implements ActionTransport via generic internal action services and status subscriptions.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_parameter_transport.hpp Declares ROS2 adapter implementing ParameterTransport with client/default/negative caches and spin mutex.
src/ros2_medkit_gateway/src/ros2/transports/ros2_parameter_transport.cpp Implements ParameterTransport via SyncParametersClient + JSON conversions + caches.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_fault_service_transport.hpp Declares ROS2 adapter implementing FaultServiceTransport (7 service clients + per-client mutexes).
src/ros2_medkit_gateway/src/ros2/transports/ros2_fault_service_transport.cpp Implements Fault service calls and message-to-JSON conversions in the adapter.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_log_source.hpp Declares ROS2 adapter implementing LogSource via /rosout subscription.
src/ros2_medkit_gateway/src/ros2/transports/ros2_log_source.cpp Implements /rosout ingestion + callback wiring for core LogManager.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_topic_transport.hpp Declares ROS2 adapter implementing TopicTransport and owning TypeIntrospection.
src/ros2_medkit_gateway/src/ros2/transports/ros2_topic_transport.cpp Implements Topic publish/sample/count and exposes TypeIntrospection to core.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/transports/ros2_topic_subscription_transport.hpp Declares ROS2 adapter implementing TopicSubscriptionTransport by wrapping TriggerTopicSubscriber.
src/ros2_medkit_gateway/src/ros2/transports/ros2_topic_subscription_transport.cpp Implements TopicSubscriptionTransport subscribe() returning RAII handles for deterministic unsubscribe.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/trigger_topic_subscriber.hpp Converts legacy include to a backwards-compat shim pointing at the new ros2 header.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/trigger_topic_subscriber.hpp Introduces new per-handle TriggerTopicSubscriber API (no ref-counted per-topic sharing).
src/ros2_medkit_gateway/src/ros2/trigger_topic_subscriber.cpp Implements the per-handle trigger subscription executor + retry behavior.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/operations/operation_types.hpp Introduces ROS-neutral operation result structs/enums for services/actions.
src/ros2_medkit_gateway/src/core/operations/operation_types.cpp Implements action_status_to_string() helper.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/data_access_manager.hpp Adds core DataAccessManager using TopicTransport.
src/ros2_medkit_gateway/src/core/managers/data_access_manager.cpp Implements publish/sample logic and the “no publishers => metadata_only” fast path.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/configuration_manager.hpp Adds core ConfigurationManager using ParameterTransport + reset orchestration.
src/ros2_medkit_gateway/src/core/managers/configuration_manager.cpp Implements shutdown semantics and reset operations via cached defaults.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/configuration/parameter_types.hpp Introduces ParameterResult + structured error codes in core.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/faults/fault_types.hpp Introduces core FaultResult / FaultWithEnvJsonResult JSON-shaped contracts.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/fault_manager.hpp Adds core FaultManager delegating to FaultServiceTransport.
src/ros2_medkit_gateway/src/core/managers/fault_manager.cpp Implements FaultManager as pure delegation layer over the transport.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/fault_manager.hpp Converts legacy include to a backwards-compat shim pointing at core manager.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/data_access_manager.hpp Converts legacy include to a backwards-compat shim pointing at core manager.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/providers/log_provider_registry.hpp Introduces LogProviderRegistry port so LogManager can depend on a neutral interface.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/plugins/plugin_manager.hpp Makes PluginManager implement LogProviderRegistry via thin forwards.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/managers/trigger_manager.hpp Updates TriggerManager to use TopicSubscriptionTransport + per-trigger RAII handles instead of TriggerTopicSubscriber*.
src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp Updates includes to use relocated TypeIntrospection.
src/ros2_medkit_gateway/src/http/handlers/operation_handlers.cpp Updates includes to use relocated TypeIntrospection.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/fault_handlers.hpp Changes build_sovd_fault_response() signature to consume transport-supplied JSON instead of ROS messages.
src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp Reworks SOVD fault response building to post-process transport JSON and construct per-request bulk_data_uri.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/ros2/conversions/fault_msg_conversions.hpp Adds ROS-boundary conversion helpers for Fault/EnvironmentData -> JSON.
src/ros2_medkit_gateway/src/ros2/conversions/fault_msg_conversions.cpp Implements the fault/environment-data JSON conversions used by SSE + triggers + fault transport.
src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp Switches to shared fault JSON conversion helper (no longer depends on FaultManager::fault_to_json).
src/ros2_medkit_gateway/include/ros2_medkit_gateway/trigger_fault_subscriber.hpp Updates docs to refer to new fault conversion helper.
src/ros2_medkit_gateway/src/trigger_fault_subscriber.cpp Switches to shared fault JSON conversion helper.
src/ros2_medkit_gateway/src/openapi/schema_builder.hpp Updates comment to reference new fault conversion helper.
src/ros2_medkit_gateway/test/test_gateway_core_smoke.cpp Extends core-only smoke test to include core managers + transport ports + resolver interface.
src/ros2_medkit_gateway/test/test_data_access_manager_routing.cpp Adds gateway_core-only routing test via MockTopicTransport.
src/ros2_medkit_gateway/test/test_configuration_manager.cpp Updates tests to construct ConfigurationManager via Ros2ParameterTransport adapter.
src/ros2_medkit_gateway/test/test_operation_manager.cpp Updates tests to construct OperationManager with Ros2 service/action transport adapters.
src/ros2_medkit_gateway/test/test_operation_handlers.cpp Updates action server callbacks and fixes cache seeding timing with graph-event refresh behavior.
src/ros2_medkit_gateway/test/test_fault_manager.cpp Updates tests to construct FaultManager via Ros2FaultServiceTransport adapter.
src/ros2_medkit_gateway/test/test_log_manager.cpp Updates tests to construct LogManager via Ros2LogSource adapter.
src/ros2_medkit_gateway/test/test_runtime_discovery.cpp Updates runtime discovery tests to use Ros2RuntimeIntrospection and introspect() behavior.
src/ros2_medkit_gateway/test/test_trigger_manager.cpp Minor tidy changes (removes unused alias; uses char overload of find()).
src/ros2_medkit_gateway/src/discovery/hybrid_discovery.cpp Removes HybridDiscoveryStrategy implementation (replaced by DiscoveryManager pipeline caching).
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/hybrid_discovery.hpp Removes HybridDiscoveryStrategy header (replaced by pipeline caching in DiscoveryManager).
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/runtime_discovery.hpp Removes RuntimeDiscoveryStrategy header (replaced by Ros2RuntimeIntrospection).

Comment on lines +74 to +78
void Ros2LogSource::stop() {
// Mark shutdown so any in-flight callback short-circuits before touching
// members that are about to destruct.
shutdown_requested_.store(true, std::memory_order_release);

Comment on lines +79 to +83

if (!client->wait_for_service(std::chrono::seconds(5))) {
result.success = false;
result.error_message = "Service not available: " + service_path;
return result;
Comment on lines +27 to +30
Ros2ServiceTransport::Ros2ServiceTransport(rclcpp::Node * node)
: node_(node), serializer_(std::make_shared<ros2_medkit_serialization::JsonSerializer>()) {
RCLCPP_INFO(node_->get_logger(), "Ros2ServiceTransport initialised (native serialization)");
}
Comment on lines +164 to +168

if (!clients.send_goal_client->wait_for_service(std::chrono::seconds(5))) {
result.error_message = "Action server not available: " + action_path;
return result;
}
bburda added 6 commits May 11, 2026 13:23
Five issues surfaced from a multi-agent self-review on the Transport
refactor:

Ros2ActionTransport::generate_uuid - static thread_local engine:
  The translation unit is part of gateway_lib which links into
  test_gateway_plugin.so (a CMake MODULE). Per CLAUDE.md, thread_local
  storage in MODULE-target code uses initial-exec TLS (TPOFF32) which is
  incompatible with shared objects and can crash at load time on
  toolchains that do not patch it. Replace with a mutex-guarded static
  std::mt19937 - UUID generation is one-per-goal so contention is
  irrelevant.

Ros2FaultServiceTransport::list_rosbags - parallel-array UB:
  Indexed file_paths[i], formats[i], durations_sec[i], sizes_bytes[i] by
  fault_codes.size() without validating that all five vectors have the
  same length. A server bug or schema drift on the remote ListRosbags
  service would have produced a UB read past end-of-vector. Compute the
  shortest length, surface the mismatch as a FaultResult error, and only
  index the validated range.

TriggerTopicSubscriber::subscribe - silent failure on rclcpp throw:
  Underlying rclcpp subscription failures were logged but the function
  returned void. Ros2TopicSubscriptionTransport then bookkept a handle
  for a dead subscription; TriggerManager added the trigger to its map;
  the HTTP handler returned 201 for a trigger that never fires. Rethrow
  from the subscriber, catch in the transport adapter to return nullptr,
  and on nullptr in TriggerManager: roll back create() with a runtime
  error, keep retry entries on the pending list (do not mark resolved),
  re-queue restored persistent triggers for retry.

Ros2ParameterTransport::cache_default_values - empty per-parameter catch:
  Per-parameter get_parameters({name}) exceptions were swallowed silently,
  so reset_parameter later returned NO_DEFAULTS_CACHED with no diagnostic
  trail. Log a throttled WARN with the parameter name, node name, and
  exception text so the failure is debuggable from /rosout.

docs/design/index.rst - stale class layout:
  Re-tag the six managers from gateway_ros2 to gateway_core to match the
  post-refactor target placement. Drop the removed
  ManifestDiscoveryStrategy class block and bullet, replace with
  ManifestLayer. Replace JsonSerializer ownership arrows on
  Operation/DataAccess Manager with Service/Action/TopicTransport
  arrows. Replace the ConfigurationManager -> rclcpp::Node arrow with
  ParameterTransport.
Sanitizer-tsan was intermittently failing on test_daisy_chain_aggregation
because gateway_node-2 (peer_b) needed more than 15s to exit after SIGINT.
The hang sat between TriggerTopicSubscriber::shutdown completion and the
end of GatewayNode::~GatewayNode - i.e. somewhere in REST server stop or
member-destruction order. ASan-only runs squeezed in under the 15s grace;
TSan (heavier instrumentation) blew past it and the launch system
escalated to SIGKILL with exit -9.

Root cause: AggregationManager has no shutdown path. When SIGINT arrived
mid-forward, peer_b had REST worker threads blocked inside
PeerClient::forward_request's local httplib::Client::Get/Post call to
peer_c. The local Client cannot be stopped externally, and the per-call
read timeout (default 5000ms, ~25-50s effective under TSan) gates worker
exit. cpp-httplib Server::stop unblocks the accept loop but cannot
interrupt blocked client I/O on the worker side.

Fix: track every active outbound httplib::Client per PeerClient and add
shutdown() that calls stop() on each. PeerClient::ScopedClient RAII
helper registers/unregisters with the active set and is used by
forward_request, forward_and_get_json, and fetch_entities (replacing
the previous local Client). AggregationManager::shutdown() forwards to
every static + discovered peer. GatewayNode::~GatewayNode invokes
aggregation_mgr_->shutdown() right after stopping mDNS and BEFORE
stop_rest_server(), so REST workers mid-forward unwind on Error::Canceled
instead of waiting out the full read timeout.

Subsequent forwards after shutdown short-circuit with 503/error rather
than dialling the peer.
The first cut of PeerClient::shutdown() also called client_->stop() on
the lazily-created health-check client to be thorough. That added a TSan
report ('unlock of an unlocked mutex (or by a wrong thread)') and a
SIGABRT on Rolling: the health-check client is exclusively used inside
check_health() which holds client_mutex_ around the entire Get() call;
calling stop() on it from the shutdown thread races with cpp-httplib's
own socket_mutex_ while check_health still owns the socket internals.
On glibc + libstdc++ on Noble the mutex misuse aborts the process
(test_beacon_param::test_exit_codes saw exit code -6); under TSan the
warning is emitted and the test fails with exit code 66.

The health-check client is short-lived (single GET, bounded read
timeout), so leaving it alone during shutdown only delays exit by at
most one health-check duration. The multi-second shutdown hang we set
out to fix came from in-flight forward_request / fetch_entities calls,
which use ScopedClient and ARE in the to_stop snapshot.
…wn interrupt

After the previous round of shutdown work, test_leaf_collision_aggregation
still escalated to SIGKILL under TSan: in-flight forward_request calls
unblocked promptly, but the periodic check_health() call inside
PeerClient was still blocked in client_->Get() against the unhealthy
peer. That call holds client_mutex_ for the full read timeout (~5s
nominal, ~25-50s under TSan), and the cpp-httplib Client cannot be
interrupted from another thread unless someone calls Client::stop() on
it.

Register the lazily-created shared client_ with the active-client set
on first creation. shutdown() then iterates the set as before and calls
stop() on every registered client, which interrupts the in-flight
health-check Get() the same way it interrupts forward_request and
fetch_entities. The client lives for the rest of the PeerClient's
lifetime so we never unregister it; declaration order guarantees
active_clients_ is destroyed before client_ so no dangling pointer ever
sits in the active set.
Calling httplib::Client::stop() from PeerClient::shutdown() to interrupt
an in-flight Get/Post on a sibling thread is the upstream-documented way
to cancel a blocking client I/O call. The cpp-httplib internal mutex
sequencing in that path tickles TSan with an 'unlock of an unlocked
mutex (or by a wrong thread)' on every aggregation shutdown, but the
supported API contract is honoured and ASan / Rolling do not abort.

TSan attributes the report to the caller (peer_client.cpp:780 in
PeerClient::shutdown), so suppress by that symbol rather than by
called_from_lib - the existing 'called_from_lib:libcpp-httplib.so.0.14'
suppression covers race: reports but not mutex: reports against a stack
whose top frame is our own translation unit.
Health-check Get() against an unresponsive peer blocks for the
configured forward timeout (5s nominal). Under TSan that becomes
~25s effective and exceeds the launch_test 15s SIGINT-to-SIGKILL
grace, producing exit -9. We cannot use httplib::Client::stop() to
interrupt the call - cpp-httplib's stop() unlocks an internal mutex
from a thread that did not lock it, and glibc + pthread debug
(Ubuntu Noble) treats that as a fatal SIGABRT.

Instead, hardcode the health-check timeouts at 1s (or the configured
forward timeout, whichever is smaller). Health checks are
ping-shaped by design; an unresponsive peer is 'unhealthy' whether
we wait 1s or 5s. Capping the health timeout bounds shutdown delay
to ~5s under TSan, well inside the grace window, without affecting
forward semantics (forward_request and fetch_entities still use the
full configured timeout via ScopedClient, which IS interruptible
through the active-client registry).
@bburda bburda requested a review from mfaferek93 May 12, 2026 11:36
Hardens the manager/transport split shipped earlier on this branch by
closing correctness, observability, and lifecycle gaps surfaced during
internal validation:

- TriggerManager::create() now returns TriggerError::SubscribeFailed
  via tl::make_unexpected instead of throwing std::runtime_error out of
  its tl::expected-returning signature; the HTTP handler maps it to 503
  through an exhaustive switch with a defensive-500 sentinel for future
  enum values. Rollback also clears the dispatch index and (for
  persistent triggers) the SQLite record.
- LogManager regains observability via an injected log_sink callback;
  gateway_node forwards to RCLCPP_WARN/ERROR so plugin-provider
  exceptions surface on /rosout again instead of std::cerr.
- Ros2LogSource holds its mutex through callback dispatch so a racing
  stop() cannot null the callback while a copy is mid-dispatch -
  honouring the documented post-stop contract.
- core/discovery/layers/runtime_layer.hpp forward-declares
  Ros2RuntimeIntrospection so the header no longer transitively pulls
  rclcpp; test_gateway_core_smoke now includes it as a link-time guard
  against future transitive leaks the grep-based purity check cannot
  see.
- In-tree includes migrate to the canonical core/managers/... paths;
  the six legacy shim headers emit #pragma message deprecation notes
  for out-of-tree consumers.
- Gateway-node constructor declares parameter-service tuning parameters
  alongside the other declare_parameter calls, leaving the
  transport-wiring block to two make_shared lines.

Tests + docs:
- New routing tests pin two behaviours: subscribe-failure on
  TriggerManager::create returns the typed error variant, and
  persistent-trigger restore queues the entry onto unresolved_data_triggers_
  when the transport rejects the re-subscription.
- test_graph_event_discovery's latency assertion drops to a 2 s
  graph-event-specific bound so backstop-driven regressions cannot slip
  through the previous 30 s trivially-true assertion.
- New test_service_action_resolver_contract pins the abstract interface
  contract; the binary links against gateway_core alone (no rclcpp).
- New test_ros2_log_source.NoCallbackFiresAfterStopReturns regression
  test for the LogSource contract.
- TSan suppressions narrow: drop mutex:PeerClient::shutdown so future
  genuine races attributed to our own symbols are no longer silenced;
  the cpp-httplib stop() pattern stays covered by called_from_lib.
- docs/config/server.rst and docs/troubleshooting.rst track the new
  30000 ms refresh_interval_ms default and its graph-event-driven
  semantics; design/index.rst gains a mock-transport code example;
  TopicTransport documents the deliberate ros2_medkit_serialization
  coupling on get_type_introspection(); LogSource documents why it is
  not named LogTransport.
@bburda bburda force-pushed the feat/medkit-managers-provider-injection branch from 0c6049f to 4aa1826 Compare May 12, 2026 14:00
…ction timeouts

Three related contract fixes in the ROS 2 transport adapters:

- Ros2LogSource::start() now clears the shutdown latch left by a prior
  stop(). Without this, the subscription lambda short-circuited forever
  after a stop()/start() cycle, breaking the documented idempotent
  start/stop contract on LogSource. New regression test covers the
  restart path alongside the existing post-stop guarantee test.
- Ros2ServiceTransport::call() bounds wait_for_service() by the
  caller-provided timeout instead of a hardcoded 5 s, so a missing
  service fails within the caller's budget (the async call below
  continues to bound itself via future.wait_for()).
- Ros2ActionTransport::send_goal() does the same for the
  action-server availability wait.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 117 out of 117 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (2)

src/ros2_medkit_gateway/include/ros2_medkit_gateway/trigger_topic_subscriber.hpp:21

  • This shim header prints a #pragma message on include. Because this is a public header, that will surface in downstream build output for every TU and may become unmanageable. Suggest removing the pragma and relying on deprecation via docs / [[deprecated]] attributes (or a macro that can be enabled explicitly).
    src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp:21
  • This shim uses #pragma message to announce deprecation, which will emit output for every compile that includes it. That can be problematic for downstream users still on the old include path. Consider replacing it with quieter deprecation signaling (docs / [[deprecated]] / opt-in macro) and keep only the forwarding #include.
#pragma once

#pragma message("deprecated: include ros2_medkit_gateway/core/managers/configuration_manager.hpp instead")

// Backwards-compatibility shim - header relocated to core/managers/.
#include "ros2_medkit_gateway/core/managers/configuration_manager.hpp"

Comment on lines 15 to +20
#pragma once

#include <memory>
#include <mutex>
#include <nlohmann/json.hpp>
#include <rclcpp/rclcpp.hpp>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <vector>
#pragma message("deprecated: include ros2_medkit_gateway/core/managers/data_access_manager.hpp instead")

#include "ros2_medkit_gateway/core/data/data_types.hpp"
#include "ros2_medkit_gateway/core/type_introspection.hpp"
#include "ros2_medkit_serialization/json_serializer.hpp"

namespace ros2_medkit_gateway {

using json = nlohmann::json;

class TopicDataProvider; // forward decl, see data/topic_data_provider.hpp

class DataAccessManager {
public:
explicit DataAccessManager(rclcpp::Node * node);

/**
* @brief Publish data to a specific topic
* @param topic_path Full topic path (e.g., /chassis/brakes/command)
* @param msg_type ROS 2 message type (e.g., std_msgs/msg/Float32)
* @param data JSON data to publish
* @param timeout_sec Timeout for the publish operation
* @return JSON with publish status
*/
json publish_to_topic(const std::string & topic_path, const std::string & msg_type, const json & data,
double timeout_sec = 5.0);

/**
* @brief Get topic sample with fallback to metadata on timeout
*
* If the topic is publishing, returns actual data with type info.
* If the topic times out, returns metadata (type, schema, pub/sub counts) instead of error.
*
* @param topic_name Full topic path (e.g., "/powertrain/engine/temperature")
* @param timeout_sec Timeout for data retrieval. Use -1.0 to use the topic_sample_timeout_sec parameter (default)
* @return JSON object with one of two structures:
* - status="data": {topic, timestamp, data, status, type, type_info, publisher_count, subscriber_count}
* - status="metadata_only": {topic, timestamp, status, type, type_info, publisher_count, subscriber_count}
* @throws TopicNotAvailableException if topic doesn't exist or metadata cannot be retrieved
*/
json get_topic_sample_with_fallback(const std::string & topic_name, double timeout_sec = -1.0);

/**
* @brief Get the type introspection instance
*/
TypeIntrospection * get_type_introspection() const {
return type_introspection_.get();
}

/**
* @brief Attach a TopicDataProvider for sampling.
*
* The provider owns the pool-backed subscription path (issue #375 race fix).
* Non-owning pointer; caller retains ownership. Safe to call once at wiring
* time; no concurrent access to the setter.
*/
void set_topic_data_provider(TopicDataProvider * provider) {
topic_data_provider_ = provider;
}

TopicDataProvider * get_topic_data_provider() const {
return topic_data_provider_;
}

/**
* @brief Get single topic sample using native rclcpp APIs
*
* Fast path for single topic sampling with publisher count check.
*
* @param topic_name Full topic path
* @param timeout_sec Timeout for sampling (only used if topic has publishers)
* @return JSON with topic data or metadata
*/
json get_topic_sample_native(const std::string & topic_name, double timeout_sec = 1.0);

/**
* @brief Get the configured topic sample timeout
* @return Timeout in seconds for topic sampling
*/
double get_topic_sample_timeout() const {
return topic_sample_timeout_sec_;
}

private:
/**
* @brief Convert TopicSampleResult to JSON with type info enrichment
*/
json sample_result_to_json(const TopicSampleResult & sample);

/**
* @brief Get or create a cached GenericPublisher for a topic
* @param topic_path Full topic path
* @param msg_type ROS 2 message type
* @return Shared pointer to GenericPublisher
*/
rclcpp::GenericPublisher::SharedPtr get_or_create_publisher(const std::string & topic_path,
const std::string & msg_type);

rclcpp::Node * node_;

/// JSON serializer for native message serialization
std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer_;

/// Cached publishers (topic+type -> publisher)
std::unordered_map<std::string, rclcpp::GenericPublisher::SharedPtr> publishers_;

/// Mutex for thread-safe publisher cache access
mutable std::shared_mutex publishers_mutex_;

std::unique_ptr<TypeIntrospection> type_introspection_;
TopicDataProvider * topic_data_provider_{nullptr}; ///< Non-owning; set at wiring time.
double topic_sample_timeout_sec_;

/**
* @brief Get default timeout for topic sampling (from parameter)
*/
double get_default_topic_timeout() const {
return topic_sample_timeout_sec_;
}
};

} // namespace ros2_medkit_gateway
// Backwards-compatibility shim - header relocated to core/managers/.
#include "ros2_medkit_gateway/core/managers/data_access_manager.hpp"
Comment on lines 15 to +20
#pragma once

#include <memory>
#include <mutex>
#include <nlohmann/json.hpp>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <vector>
#pragma message("deprecated: include ros2_medkit_gateway/core/managers/fault_manager.hpp instead")

#include "ros2_medkit_msgs/msg/environment_data.hpp"
#include "ros2_medkit_msgs/msg/fault.hpp"
#include "ros2_medkit_msgs/srv/clear_fault.hpp"
#include "ros2_medkit_msgs/srv/get_fault.hpp"
#include "ros2_medkit_msgs/srv/get_rosbag.hpp"
#include "ros2_medkit_msgs/srv/get_snapshots.hpp"
#include "ros2_medkit_msgs/srv/list_faults.hpp"
#include "ros2_medkit_msgs/srv/list_rosbags.hpp"
#include "ros2_medkit_msgs/srv/report_fault.hpp"

namespace ros2_medkit_gateway {

using json = nlohmann::json;

/// Result of a fault operation
struct FaultResult {
bool success;
json data;
std::string error_message;
};

/// Result of get_fault operation with full message types
struct FaultWithEnvResult {
bool success;
std::string error_message;
ros2_medkit_msgs::msg::Fault fault;
ros2_medkit_msgs::msg::EnvironmentData environment_data;
};

/// Manager for fault management operations
/// Provides interface to the ros2_medkit_fault_manager services
class FaultManager {
public:
explicit FaultManager(rclcpp::Node * node);

/// Report a fault from a component
/// @param fault_code Unique fault identifier
/// @param severity Fault severity (0=INFO, 1=WARN, 2=ERROR, 3=CRITICAL)
/// @param description Human-readable description
/// @param source_id Component identifier (namespace path)
/// @return FaultResult with success status
FaultResult report_fault(const std::string & fault_code, uint8_t severity, const std::string & description,
const std::string & source_id);

/// Get all faults, optionally filtered by component
/// @param source_id Optional component identifier to filter by (empty = all)
/// @param include_prefailed Include PREFAILED status faults (debounce not yet confirmed)
/// @param include_confirmed Include CONFIRMED status faults
/// @param include_cleared Include CLEARED status faults
/// @param include_healed Include HEALED and PREPASSED status faults
/// @param include_muted Include muted faults (correlation symptoms) in response
/// @param include_clusters Include cluster info in response
/// @return FaultResult with array of faults (and optionally muted_faults and clusters)
FaultResult list_faults(const std::string & source_id = "", bool include_prefailed = true,
bool include_confirmed = true, bool include_cleared = false, bool include_healed = false,
bool include_muted = false, bool include_clusters = false);

/// Get a specific fault by code with environment data
/// @param fault_code Fault identifier
/// @param source_id Optional component identifier to verify fault belongs to component
/// @return FaultWithEnvResult with fault and environment_data, or error if not found
FaultWithEnvResult get_fault_with_env(const std::string & fault_code, const std::string & source_id = "");

/// Get a specific fault by code (JSON result - legacy)
/// @param fault_code Fault identifier
/// @param source_id Optional component identifier to verify fault belongs to component
/// @return FaultResult with fault data or error if not found
/// @note Thread-safe: delegates to get_fault_with_env() which acquires get_mutex_.
/// Do NOT call this method while holding get_mutex_.
FaultResult get_fault(const std::string & fault_code, const std::string & source_id = "");

/// Clear a fault
/// @param fault_code Fault identifier to clear
/// @return FaultResult with success status
FaultResult clear_fault(const std::string & fault_code);

/// Get snapshots for a fault
/// @param fault_code Fault identifier
/// @param topic Optional topic filter (empty = all topics)
/// @return FaultResult with snapshot data (JSON in data field)
FaultResult get_snapshots(const std::string & fault_code, const std::string & topic = "");

/// Get rosbag file info for a fault
/// @param fault_code Fault identifier
/// @return FaultResult with rosbag file path and metadata
FaultResult get_rosbag(const std::string & fault_code);

/// Get all rosbag files for an entity (batch operation)
/// @param entity_fqn Entity fully qualified name for prefix matching
/// @return FaultResult with arrays of rosbag metadata
FaultResult list_rosbags(const std::string & entity_fqn);

/// Check if fault manager service is available
/// @return true if services are available
bool is_available() const;

/// Convert Fault message to JSON (static utility for reuse by SSE handler)
static json fault_to_json(const ros2_medkit_msgs::msg::Fault & fault);

private:
/// Wait for services to become available
bool wait_for_services(std::chrono::duration<double> timeout);

rclcpp::Node * node_;

/// Service clients
rclcpp::Client<ros2_medkit_msgs::srv::ReportFault>::SharedPtr report_fault_client_;
rclcpp::Client<ros2_medkit_msgs::srv::GetFault>::SharedPtr get_fault_client_;
rclcpp::Client<ros2_medkit_msgs::srv::ListFaults>::SharedPtr list_faults_client_;
rclcpp::Client<ros2_medkit_msgs::srv::ClearFault>::SharedPtr clear_fault_client_;
rclcpp::Client<ros2_medkit_msgs::srv::GetSnapshots>::SharedPtr get_snapshots_client_;
rclcpp::Client<ros2_medkit_msgs::srv::GetRosbag>::SharedPtr get_rosbag_client_;
rclcpp::Client<ros2_medkit_msgs::srv::ListRosbags>::SharedPtr list_rosbags_client_;

/// Service timeout
double service_timeout_sec_{5.0};
std::string fault_manager_base_path_{"/fault_manager"};

/// Per-client mutexes for thread-safe service calls.
/// Split by service client so that read operations (list, get) are not blocked
/// by slow write operations (report_fault with snapshot capture).
mutable std::mutex report_mutex_;
mutable std::mutex list_mutex_;
mutable std::mutex get_mutex_;
mutable std::mutex clear_mutex_;
mutable std::mutex snapshots_mutex_;
mutable std::mutex rosbag_mutex_;
mutable std::mutex list_rosbags_mutex_;
};

} // namespace ros2_medkit_gateway
// Backwards-compatibility shim - header relocated to core/managers/.
#include "ros2_medkit_gateway/core/managers/fault_manager.hpp"
Comment on lines +69 to +75
}
if (sample.status == "data" && !sample.data.is_null()) {
out["data"] = sample.data;
out["timestamp"] =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
}
Comment on lines +112 to +122
const auto timeout_secs =
std::chrono::milliseconds{static_cast<std::int64_t>(std::max(timeout.count(), 0.0) * 1000.0)};
auto future_status = future_and_id.wait_for(timeout_secs);

if (future_status != std::future_status::ready) {
client->remove_pending_request(future_and_id.request_id);
ros2_medkit_serialization::destroy_ros_message(&ros_request);
result.success = false;
result.error_message =
"Service call timed out (" + std::to_string(static_cast<int>(timeout.count())) + "s): " + service_path;
return result;
Comment on lines 15 to +20
#pragma once

#include <action_msgs/msg/goal_status_array.hpp>
#include <array>
#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <nlohmann/json.hpp>
#include <optional>
#include <random>
#include <rclcpp/rclcpp.hpp>
#include <shared_mutex>
#include <string>
#include <vector>
#pragma message("deprecated: include ros2_medkit_gateway/core/managers/operation_manager.hpp instead")

#include "ros2_medkit_gateway/compat/generic_client_compat.hpp"
#include "ros2_medkit_gateway/core/discovery/models/common.hpp"
#include "ros2_medkit_gateway/discovery/discovery_manager.hpp"
#include "ros2_medkit_serialization/json_serializer.hpp"
#include "ros2_medkit_serialization/service_action_types.hpp"

namespace ros2_medkit_gateway {

class ResourceChangeNotifier;

using json = nlohmann::json;

/// Result of a synchronous service call
struct ServiceCallResult {
bool success;
json response;
std::string error_message;
};

/// Action goal status (matches ROS2 action_msgs/msg/GoalStatus)
enum class ActionGoalStatus : int8_t {
UNKNOWN = 0,
ACCEPTED = 1,
EXECUTING = 2,
CANCELING = 3,
SUCCEEDED = 4,
CANCELED = 5,
ABORTED = 6
};

/// Convert status enum to string
std::string action_status_to_string(ActionGoalStatus status);

/// Result of sending an action goal
struct ActionSendGoalResult {
bool success;
std::string goal_id; // UUID hex string
bool goal_accepted;
std::string error_message;
};

/// Result of canceling an action goal
struct ActionCancelResult {
bool success;
int8_t return_code; // 0=accepted, 1=rejected, 2=unknown_id, 3=terminated
std::string error_message;
};

/// Result of getting action result
struct ActionGetResultResult {
bool success;
ActionGoalStatus status;
json result;
std::string error_message;
};

/// Tracked action goal info (stored locally)
struct ActionGoalInfo {
std::string goal_id;
std::string action_path; // e.g., /powertrain/engine/long_calibration
std::string action_type; // e.g., example_interfaces/action/Fibonacci
std::string entity_id; // SOVD entity ID (e.g., engine) for trigger notifications
ActionGoalStatus status;
json last_feedback;
std::chrono::system_clock::time_point created_at;
std::chrono::system_clock::time_point last_update;
};

/// Manager for ROS2 operations (services and actions)
/// Handles service calls synchronously and action calls asynchronously
class OperationManager {
public:
explicit OperationManager(rclcpp::Node * node, DiscoveryManager * discovery_manager);

~OperationManager();

OperationManager(const OperationManager &) = delete;
OperationManager & operator=(const OperationManager &) = delete;
OperationManager(OperationManager &&) = delete;
OperationManager & operator=(OperationManager &&) = delete;

/// Explicitly release subscriptions, clients, and tracked goals.
/// Call while executor is still running to allow safe callback cleanup.
/// Called automatically by destructor, but GatewayNode calls it earlier
/// during its shutdown sequence.
void shutdown();

/// Set optional notifier for broadcasting operation status changes to trigger subsystem.
void set_notifier(ResourceChangeNotifier * notifier);

/// Call a ROS2 service synchronously
/// @param service_path Full service path (e.g., "/powertrain/engine/calibrate")
/// @param service_type Service type (e.g., "std_srvs/srv/Trigger")
/// @param request JSON request body
/// @return ServiceCallResult with response or error
ServiceCallResult call_service(const std::string & service_path, const std::string & service_type,
const json & request);

/// Find and call a service by component and operation name
/// Uses discovery cache to resolve service path and type if not provided
/// @param component_ns Component namespace (e.g., "/powertrain/engine")
/// @param operation_name Operation name (e.g., "calibrate")
/// @param service_type Optional service type override
/// @param request JSON request body
/// @return ServiceCallResult with response or error
ServiceCallResult call_component_service(const std::string & component_ns, const std::string & operation_name,
const std::optional<std::string> & service_type, const json & request);

/// Validate message type format (package/srv/Type or package/action/Type)
static bool is_valid_message_type(const std::string & type);

/// Validate UUID hex string format (32 hex characters)
static bool is_valid_uuid_hex(const std::string & uuid_hex);

/// Check if type is a service type (contains /srv/)
static bool is_service_type(const std::string & type);

/// Check if type is an action type (contains /action/)
static bool is_action_type(const std::string & type);

// ==================== ACTION OPERATIONS ====================

/// Send a goal to an action server using native rclcpp_action internal services
/// @param action_path Full action path (e.g., "/powertrain/engine/long_calibration")
/// @param action_type Action type (e.g., "example_interfaces/action/Fibonacci")
/// @param goal JSON goal data
/// @param entity_id SOVD entity ID for trigger notifications (e.g., "engine")
/// @return ActionSendGoalResult with goal_id or error
ActionSendGoalResult send_action_goal(const std::string & action_path, const std::string & action_type,
const json & goal, const std::string & entity_id = "");

/// Send a goal to an action using component namespace and operation name
/// Uses discovery cache to resolve action path and type if not provided
/// @param component_ns Component namespace (e.g., "/powertrain/engine")
/// @param operation_name Operation name (e.g., "long_calibration")
/// @param action_type Optional action type override
/// @param goal JSON goal data
/// @param entity_id SOVD entity ID for trigger notifications (e.g., "engine")
/// @return ActionSendGoalResult with goal_id or error
ActionSendGoalResult send_component_action_goal(const std::string & component_ns, const std::string & operation_name,
const std::optional<std::string> & action_type, const json & goal,
const std::string & entity_id = "");

/// Cancel a running action goal using ros2 action cancel
/// @param action_path Full action path
/// @param goal_id Goal UUID hex string
/// @return ActionCancelResult
ActionCancelResult cancel_action_goal(const std::string & action_path, const std::string & goal_id);

/// Get the result of a completed action
/// This is a blocking call that waits for the action to complete
/// @param action_path Full action path
/// @param action_type Action type
/// @param goal_id Goal UUID hex string
/// @return ActionGetResultResult with result or error
ActionGetResultResult get_action_result(const std::string & action_path, const std::string & action_type,
const std::string & goal_id);

/// Get tracked goal info by goal_id
/// @param goal_id Goal UUID hex string
/// @return Optional ActionGoalInfo if found
std::optional<ActionGoalInfo> get_tracked_goal(const std::string & goal_id) const;

/// List all tracked goals
/// @return Vector of all tracked goals
std::vector<ActionGoalInfo> list_tracked_goals() const;

/// Get all goals for a specific action path
/// @param action_path Full action path (e.g., "/powertrain/engine/long_calibration")
/// @return Vector of goals for that action, sorted by last_update (newest first)
std::vector<ActionGoalInfo> get_goals_for_action(const std::string & action_path) const;

/// Get the most recent goal for a specific action path
/// @param action_path Full action path
/// @return Optional ActionGoalInfo if any goal exists for this action
std::optional<ActionGoalInfo> get_latest_goal_for_action(const std::string & action_path) const;

/// Update goal status in tracking
/// @param goal_id Goal UUID hex string
/// @param status New status
void update_goal_status(const std::string & goal_id, ActionGoalStatus status);

/// Update goal feedback in tracking
/// @param goal_id Goal UUID hex string
/// @param feedback New feedback JSON
void update_goal_feedback(const std::string & goal_id, const json & feedback);

/// Remove completed goals older than specified duration
/// @param max_age Maximum age of completed goals to keep
void cleanup_old_goals(std::chrono::seconds max_age = std::chrono::seconds(300));

/// Subscribe to action status topic for real-time updates
/// Called automatically when a goal is sent
/// @param action_path Full action path (e.g., "/powertrain/engine/long_calibration")
void subscribe_to_action_status(const std::string & action_path);

/// Unsubscribe from action status topic
/// Called when no more active goals exist for this action
/// @param action_path Full action path
void unsubscribe_from_action_status(const std::string & action_path);

private:
/// Set of clients for an action (internal services)
struct ActionClientSet {
compat::GenericServiceClient::SharedPtr send_goal_client;
compat::GenericServiceClient::SharedPtr get_result_client;
compat::GenericServiceClient::SharedPtr cancel_goal_client;
std::string action_type; // Store type for later use
};

/// Convert UUID hex string to JSON array of byte values
json uuid_hex_to_json_array(const std::string & uuid_hex);

/// Generate a random UUID
std::array<uint8_t, 16> generate_uuid();

/// Convert UUID bytes to JSON array
json uuid_bytes_to_json_array(const std::array<uint8_t, 16> & uuid);

/// Track a new goal
void track_goal(const std::string & goal_id, const std::string & action_path, const std::string & action_type,
const std::string & entity_id);

/// Get or create a cached GenericServiceClient for a service
compat::GenericServiceClient::SharedPtr get_or_create_service_client(const std::string & service_path,
const std::string & service_type);

/// Get or create cached action clients for an action
ActionClientSet & get_or_create_action_clients(const std::string & action_path, const std::string & action_type);

/// Make cache key from service path and type
static std::string make_client_key(const std::string & service_path, const std::string & service_type);

rclcpp::Node * node_;
DiscoveryManager * discovery_manager_;
ResourceChangeNotifier * notifier_ = nullptr;

/// Random number generator for UUID generation
std::mt19937 rng_;
std::mutex rng_mutex_;

/// Native JSON serializer for service calls
std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer_;

/// Cache for GenericServiceClient instances (key = "service_path|service_type")
mutable std::shared_mutex clients_mutex_;
std::map<std::string, compat::GenericServiceClient::SharedPtr> generic_clients_;

/// Cache for action client sets (key = action_path)
std::map<std::string, ActionClientSet> action_clients_;

/// Timeout for service calls in seconds (configurable via service_call_timeout_sec param)
int service_call_timeout_sec_;

/// Map of goal_id -> ActionGoalInfo for tracking active goals
mutable std::mutex goals_mutex_;
std::map<std::string, ActionGoalInfo> tracked_goals_;

/// Callback for action status topic updates
void on_action_status(const std::string & action_path, const action_msgs::msg::GoalStatusArray::ConstSharedPtr & msg);

/// Convert goal UUID bytes to hex string
std::string uuid_bytes_to_hex(const std::array<uint8_t, 16> & uuid) const;

/// Map of action_path -> status subscription
mutable std::mutex subscriptions_mutex_;
std::map<std::string, rclcpp::Subscription<action_msgs::msg::GoalStatusArray>::SharedPtr> status_subscriptions_;
};

} // namespace ros2_medkit_gateway
// Backwards-compatibility shim - header relocated to core/managers/.
#include "ros2_medkit_gateway/core/managers/operation_manager.hpp"
Comment on lines 15 to +20
#pragma once

#include <atomic>
#include <deque>
#include <functional>
#include <mutex>
#include <nlohmann/json.hpp>
#include <optional>
#include <rcl_interfaces/msg/log.hpp>
#include <rclcpp/rclcpp.hpp>
#include <string>
#include <tl/expected.hpp>
#include <unordered_map>
#include <vector>
#pragma message("deprecated: include ros2_medkit_gateway/core/managers/log_manager.hpp instead")

#include "ros2_medkit_gateway/core/log_types.hpp"
#include "ros2_medkit_gateway/core/providers/log_provider.hpp"

namespace ros2_medkit_gateway {

using json = nlohmann::json;

class PluginManager; // forward declaration — full include in .cpp
class ResourceChangeNotifier; // forward declaration

/**
* @brief Manages /rosout log collection via the default ring-buffer backend.
*
* Subscribes to /rosout, normalizes logger names (strips leading '/'),
* and stores entries per node-name in fixed-size deque ring buffers.
*
* Plugin integration (two modes):
* - **Observer mode** (default): If a LogProvider plugin is registered, get_logs()
* and get_config() delegate to it. on_log_entry() is called on ALL LogProvider
* observers for every /rosout message. Observers returning true suppress ring-buffer
* storage.
* - **Full ingestion** (manages_ingestion() == true): The primary LogProvider owns
* the entire log pipeline. LogManager skips the /rosout subscription and ring buffer
* entirely. All queries and config operations delegate to the plugin.
*
* FQN normalization:
* - entity.fqn from the entity cache has a leading '/' (e.g. "/powertrain/engine/temp_sensor")
* - /rosout msg.name does NOT have a leading '/' (rcl_node_get_logger_name convention)
* - Callers pass raw FQNs from entity.fqn; LogManager strips leading '/' internally.
*/
class LogManager {
public:
/// Default maximum number of entries retained per node in the ring buffer
static constexpr size_t kDefaultBufferSize = 200;

/**
* @brief Construct LogManager
*
* If the primary LogProvider's manages_ingestion() returns true, the /rosout
* subscription and ring buffer are skipped entirely. Otherwise subscribes to
* /rosout as usual.
*
* @param node ROS 2 node (used for subscription and logging)
* @param plugin_mgr PluginManager for LogProvider lookup (may be nullptr)
* @param max_buffer_size Ring buffer size per node (override for unit testing)
*/
explicit LogManager(rclcpp::Node * node, PluginManager * plugin_mgr = nullptr,
size_t max_buffer_size = kDefaultBufferSize);

~LogManager();

LogManager(const LogManager &) = delete;
LogManager & operator=(const LogManager &) = delete;
LogManager(LogManager &&) = delete;
LogManager & operator=(LogManager &&) = delete;

/**
* @brief Query log entries for a set of node FQNs
*
* If a LogProvider plugin is registered, delegates to it.
* Otherwise uses the local ring buffer.
*
* @param node_fqns Node FQNs from entity.fqn (WITH leading '/' — normalized internally)
* @param prefix_match If true, match all buffered nodes whose name starts with the given prefix
* (used for Component queries). If false, exact match (App queries).
* @param min_severity Additional severity filter from query parameter. Empty = no override.
* @param context_filter Substring filter applied to log entry's name (logger name). Empty = no filter.
* @param entity_id Entity ID for config lookup. Empty = use defaults.
* @return JSON array of LogEntry objects sorted by id ascending, capped at entity config max_entries.
*/
tl::expected<json, std::string> get_logs(const std::vector<std::string> & node_fqns, bool prefix_match,
const std::string & min_severity, const std::string & context_filter,
const std::string & entity_id);

/// Get current log configuration for entity (returns defaults if unconfigured)
tl::expected<LogConfig, std::string> get_config(const std::string & entity_id) const;

/**
* @brief Update log configuration for an entity
* @return Empty string on success, error message on validation failure
*/
std::string update_config(const std::string & entity_id, const std::optional<std::string> & severity_filter,
const std::optional<size_t> & max_entries);

/**
* @brief Programmatically add a log entry (e.g. from trigger log_settings)
*
* Creates a LogEntry and pushes it to the internal ring buffer using the
* same path as on_rosout(). If a ResourceChangeNotifier is set, emits a
* "logs" CREATED notification so triggers can observe log changes.
*
* @param entity_id Entity to associate the log with (used as logger name)
* @param severity SOVD severity string (debug, info, warning, error, fatal)
* @param message Human-readable log message
* @param metadata Additional JSON metadata stored in the message (appended)
*/
void add_log_entry(const std::string & entity_id, const std::string & severity, const std::string & message,
const nlohmann::json & metadata);

/// Set the ResourceChangeNotifier for emitting log change events.
/// Called by GatewayNode after both LogManager and the notifier are available.
void set_notifier(ResourceChangeNotifier * notifier);

/// Callback that maps a ROS 2 node FQN to a manifest entity ID.
/// Returns empty string if the FQN cannot be resolved.
using NodeToEntityFn = std::function<std::string(const std::string &)>;

/// Set the node-to-entity resolver for trigger notifications.
/// When set, on_rosout() resolves node names to manifest entity IDs before
/// notifying the ResourceChangeNotifier.
void set_node_to_entity_resolver(NodeToEntityFn resolver);

// ---- Static utilities (no ROS 2 required — safe in unit tests) ----

/// Convert ROS 2 uint8 log level -> SOVD severity string ("debug" for unknown levels)
static std::string level_to_severity(uint8_t level);

/// Convert SOVD severity string -> ROS 2 uint8 log level (0 for invalid/empty)
static uint8_t severity_to_level(const std::string & severity);

/// Check if a severity string is valid (one of: debug, info, warning, error, fatal)
static bool is_valid_severity(const std::string & severity);

/// Format a LogEntry as SOVD JSON (id, timestamp, severity, message, context)
static json entry_to_json(const LogEntry & entry);

/// Strip leading '/' from a node FQN for ring-buffer key normalization
static std::string normalize_fqn(const std::string & fqn);

// ---- Test injection (for unit tests — do not use in production code) ----

/**
* @brief Inject a log entry directly into the ring buffer (bypasses /rosout subscription)
*
* Used by unit tests to populate the buffer without a live ROS 2 graph.
* In production the buffer is populated exclusively by the /rosout callback.
*/
void inject_entry_for_testing(LogEntry entry);

private:
void on_rosout(const rcl_interfaces::msg::Log::ConstSharedPtr & msg);

/// Get the effective LogProvider: plugin if registered, else nullptr (use local buffer)
LogProvider * effective_provider() const;

rclcpp::Node * node_;
PluginManager * plugin_mgr_;
ResourceChangeNotifier * notifier_ = nullptr;
/// Write-once: must be set before ROS executor starts spinning.
/// After that, only read from subscription callbacks (no mutex needed).
NodeToEntityFn node_to_entity_resolver_;
size_t max_buffer_size_;

rclcpp::Subscription<rcl_interfaces::msg::Log>::SharedPtr rosout_sub_;

// Ring buffers keyed by normalized node name (no leading '/')
// e.g. "powertrain/engine/temp_sensor" -> deque<LogEntry>
std::unordered_map<std::string, std::deque<LogEntry>> buffers_;
mutable std::mutex buffers_mutex_;

// Per-entity configuration keyed by entity_id
std::unordered_map<std::string, LogConfig> configs_;
mutable std::mutex configs_mutex_;

// Monotonically increasing entry ID (never resets; starts at 1)
std::atomic<int64_t> next_id_{1};
};

} // namespace ros2_medkit_gateway
// Backwards-compatibility shim - header relocated to core/managers/.
#include "ros2_medkit_gateway/core/managers/log_manager.hpp"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Decouple gateway managers from rclcpp via provider injection for testability and reduced flakiness

2 participants